Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the system's capabilities by integrating the OpenAI Responses API, offering a new way to process and generate responses from OpenAI models. It also introduces a global streaming output configuration, allowing various parts of the application, such as webchat title generation and cron tasks, to leverage real-time data streams for improved performance and user experience. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a new OpenAI Responses API provider, including a new tool schema conversion method, a new provider configuration, and a new provider source file. It also updates the main agent to support streaming responses for title generation and adjusts cron manager configuration. The review comments highlight several areas for improvement: a potential bug in _collect_streamed_response where streamed title generation might return incomplete text; significant code duplication in text_chat and text_chat_stream's retry logic within the new OpenAI Responses provider; code duplication between openai_responses_schema and openai_schema in tool.py; an unnecessary try...except block in _handle_webchat; code duplication in request parameter preparation within _query and _query_stream; and a side effect in _remove_image_from_context that modifies input parameters directly.
| async def _collect_streamed_response() -> LLMResponse | None: | ||
| full_text = "" | ||
| last_resp: LLMResponse | None = None | ||
| async for resp in prov.text_chat_stream( | ||
| system_prompt=system_prompt, | ||
| prompt=prompt, | ||
| ): | ||
| last_resp = resp | ||
| if resp.is_chunk: | ||
| chunk_text = resp.completion_text | ||
| if chunk_text: | ||
| full_text += chunk_text | ||
| if last_resp and last_resp.completion_text: | ||
| return last_resp | ||
| if full_text: | ||
| return LLMResponse("assistant", completion_text=full_text) | ||
| return last_resp |
There was a problem hiding this comment.
在 _collect_streamed_response 函数中,处理流式响应的逻辑存在一个潜在的 bug。当流结束时,if last_resp and last_resp.completion_text: 这行代码可能会错误地返回最后一个数据块(chunk),而不是完整的响应文本。
例如,如果流式响应由多个 chunks 组成 ("A", "B", "C"),last_resp 将是最后一个 chunk "C"。last_resp.completion_text 为 "C",条件成立,函数返回 "C",导致生成的标题不完整。
为了修复这个问题,应该优先使用拼接好的 full_text,或者确保只在 last_resp 不是 chunk 的情况下才返回它。
| async def _collect_streamed_response() -> LLMResponse | None: | |
| full_text = "" | |
| last_resp: LLMResponse | None = None | |
| async for resp in prov.text_chat_stream( | |
| system_prompt=system_prompt, | |
| prompt=prompt, | |
| ): | |
| last_resp = resp | |
| if resp.is_chunk: | |
| chunk_text = resp.completion_text | |
| if chunk_text: | |
| full_text += chunk_text | |
| if last_resp and last_resp.completion_text: | |
| return last_resp | |
| if full_text: | |
| return LLMResponse("assistant", completion_text=full_text) | |
| return last_resp | |
| async def _collect_streamed_response() -> LLMResponse | None: | |
| full_text = "" | |
| last_resp: LLMResponse | None = None | |
| async for resp in prov.text_chat_stream( | |
| system_prompt=system_prompt, | |
| prompt=prompt, | |
| ): | |
| last_resp = resp | |
| if resp.is_chunk: | |
| chunk_text = resp.completion_text | |
| if chunk_text: | |
| full_text += chunk_text | |
| if last_resp and not last_resp.is_chunk: | |
| return last_resp | |
| if full_text: | |
| return LLMResponse("assistant", completion_text=full_text) | |
| return last_resp |
| async def text_chat_stream( | ||
| self, | ||
| prompt=None, | ||
| session_id=None, | ||
| image_urls=None, | ||
| func_tool=None, | ||
| contexts=None, | ||
| system_prompt=None, | ||
| tool_calls_result=None, | ||
| model=None, | ||
| **kwargs, | ||
| ) -> AsyncGenerator[LLMResponse, None]: | ||
| payloads, context_query = await self._prepare_chat_payload( | ||
| prompt, | ||
| image_urls, | ||
| contexts, | ||
| system_prompt, | ||
| tool_calls_result, | ||
| model=model, | ||
| **kwargs, | ||
| ) | ||
|
|
||
| max_retries = 10 | ||
| available_api_keys = self.api_keys.copy() | ||
| chosen_key = random.choice(available_api_keys) | ||
| image_fallback_used = False | ||
|
|
||
| last_exception = None | ||
| retry_cnt = 0 | ||
| for retry_cnt in range(max_retries): | ||
| try: | ||
| self.client.api_key = chosen_key | ||
| async for response in self._query_stream(payloads, func_tool): | ||
| yield response | ||
| break | ||
| except Exception as e: | ||
| last_exception = e | ||
| ( | ||
| success, | ||
| chosen_key, | ||
| available_api_keys, | ||
| payloads, | ||
| context_query, | ||
| func_tool, | ||
| image_fallback_used, | ||
| ) = await self._handle_api_error( | ||
| e, | ||
| payloads, | ||
| context_query, | ||
| func_tool, | ||
| chosen_key, | ||
| available_api_keys, | ||
| retry_cnt, | ||
| max_retries, | ||
| image_fallback_used=image_fallback_used, | ||
| ) | ||
| if success: | ||
| break | ||
|
|
||
| if retry_cnt == max_retries - 1: | ||
| logger.error(f"API 调用失败,重试 {max_retries} 次仍然失败。") | ||
| if last_exception is None: | ||
| raise Exception("未知错误") | ||
| raise last_exception |
There was a problem hiding this comment.
| def openai_responses_schema( | ||
| self, omit_empty_parameter_field: bool = False | ||
| ) -> list[dict]: | ||
| """Convert tools to OpenAI Responses API schema format. | ||
|
|
||
| Note: Responses API expects top-level `name` instead of nested `function.name`. | ||
| """ | ||
| result = [] | ||
| for tool in self.tools: | ||
| func_def = {"type": "function", "name": tool.name} | ||
| if tool.description: | ||
| func_def["description"] = tool.description | ||
|
|
||
| if tool.parameters is not None: | ||
| if ( | ||
| tool.parameters and tool.parameters.get("properties") | ||
| ) or not omit_empty_parameter_field: | ||
| func_def["parameters"] = tool.parameters | ||
|
|
||
| result.append(func_def) | ||
| return result |
There was a problem hiding this comment.
这个新的 openai_responses_schema 方法与现有的 openai_schema 方法存在大量重复代码。为了提高代码的可维护性并遵循 DRY (Don't Repeat Yourself) 原则,建议进行重构。
一个可能的重构方式是让 openai_responses_schema 复用 openai_schema 的逻辑,然后对结果进行转换,以适应 "Responses API" 的格式。
def openai_responses_schema(
self, omit_empty_parameter_field: bool = False
) -> list[dict]:
"""Convert tools to OpenAI Responses API schema format.
Note: Responses API expects top-level `name` instead of nested `function.name`.
"""
# 复用 openai_schema 的逻辑并转换输出
openai_tools = self.openai_schema(omit_empty_parameter_field)
responses_tools = []
for tool in openai_tools:
func_details = tool.get("function", {})
responses_tool = {"type": "function", "name": func_details.get("name")}
if "description" in func_details:
responses_tool["description"] = func_details["description"]
if "parameters" in func_details:
responses_tool["parameters"] = func_details["parameters"]
responses_tools.append(responses_tool)
return responses_tools这样可以消除重复代码,当未来需要修改工具 schema 的生成逻辑时,只需要在一处进行修改。
| streaming_enabled = False | ||
| try: | ||
| streaming_enabled = bool( | ||
| getattr(prov, "provider_settings", {}).get("streaming_response", False) | ||
| ) | ||
| except Exception: | ||
| streaming_enabled = False |
| payloads.pop("abort_signal", None) | ||
|
|
||
| if payloads.get("store") is not False: | ||
| logger.warning( | ||
| "OpenAI Responses API requires store=false; overriding request store.", | ||
| ) | ||
| payloads["store"] = False | ||
|
|
||
| extra_body = {} | ||
| to_del = [] | ||
| for key in payloads: | ||
| if key not in self.default_params: | ||
| extra_body[key] = payloads[key] | ||
| to_del.append(key) | ||
| for key in to_del: | ||
| del payloads[key] | ||
|
|
||
| custom_extra_body = self.provider_config.get("custom_extra_body", {}) | ||
| if isinstance(custom_extra_body, dict): | ||
| extra_body.update(custom_extra_body) | ||
| if extra_body.get("store") is not False: | ||
| if "store" in extra_body: | ||
| logger.warning( | ||
| "OpenAI Responses API requires store=false; overriding extra_body store.", | ||
| ) | ||
| extra_body.pop("store", None) |
| async def _remove_image_from_context(self, contexts: list): | ||
| new_contexts = [] | ||
|
|
||
| for context in contexts: | ||
| if "content" in context and isinstance(context["content"], list): | ||
| new_content = [] | ||
| for item in context["content"]: | ||
| if isinstance(item, dict) and "image_url" in item: | ||
| continue | ||
| new_content.append(item) | ||
| if not new_content: | ||
| new_content = [{"type": "text", "text": "[图片]"}] | ||
| context["content"] = new_content | ||
| new_contexts.append(context) | ||
| return new_contexts |
There was a problem hiding this comment.
_remove_image_from_context 方法在处理 contexts 列表时,会直接修改列表中字典元素的内容(context["content"] = new_content)。虽然它返回了一个新的列表 new_contexts,但原始 contexts 列表中的元素也被改变了。
这种修改输入参数的副作用(side effect)会使代码行为变得难以预测,并可能在其他地方引发意想不到的 bug。一个好的实践是让函数成为“纯”函数,即不修改输入,只返回新的输出。
建议重构此方法,通过创建 context 字典的副本来避免修改原始数据。
| async def _remove_image_from_context(self, contexts: list): | |
| new_contexts = [] | |
| for context in contexts: | |
| if "content" in context and isinstance(context["content"], list): | |
| new_content = [] | |
| for item in context["content"]: | |
| if isinstance(item, dict) and "image_url" in item: | |
| continue | |
| new_content.append(item) | |
| if not new_content: | |
| new_content = [{"type": "text", "text": "[图片]"}] | |
| context["content"] = new_content | |
| new_contexts.append(context) | |
| return new_contexts | |
| async def _remove_image_from_context(self, contexts: list): | |
| new_contexts = [] | |
| for context in contexts: | |
| new_context = context.copy() | |
| if "content" in new_context and isinstance(new_context["content"], list): | |
| new_content = [ | |
| item | |
| for item in new_context["content"] | |
| if not (isinstance(item, dict) and "image_url" in item) | |
| ] | |
| if not new_content: | |
| new_content = [{"type": "text", "text": "[图片]"}] | |
| new_context["content"] = new_content | |
| new_contexts.append(new_context) | |
| return new_contexts |
1.新增功能:添加了新的适配器 OpenAi(Responses),以支持 openai response模式。。
2.新增功能:流式输出配置(普通配置=>其它配置=>流式输出)对未来任务生效。注意:仅是支持向服务商的配置,并非显示给用户的输出本身是流式。
Modifications / 改动点
1.astrbot/core/provider/sources/openai_responses_source.py
此处是实现 openai_responses的核心适配器文件。
2.astrbot/core/agent/tool.py
提供openai_responses_schema函数供astrbot/core/provider/sources/openai_responses_source.py使用。
3.astrbot/core/provider/manager.py
此处在模型提供商处新增openai_responses选项。
4.astrbot/core/config/default.py
此处提供openai_responses的默认配置。
5.astrbot/core/astr_main_agent.py
提供流式输出配置(普通配置=>其它配置=>流式输出)对生成任务标题生效功能。
注意:仅是支持向服务商的配置,并非显示给用户的输出本身是流式。
6.astrbot/core/cron/manager.py
提供流式输出配置(普通配置=>其它配置=>流式输出)对未来任务生效功能。
注意:仅是支持向服务商的配置,并非显示给用户的输出本身是流式。
Screenshots or Test Results / 运行截图或测试结果
验证步骤:打开流式输出配置。选择一个仅支持流式输出的服务商。
a.在 webchat 进行交流,观察是否生成标题,是否正常回复。
b.通知 ai 新建一个未来任务,观察未来任务是否自动输出。
c.重启项目。点击 openairesponses 适配器中模型的测试连接按钮。
注意:openai_responses适配器的 test,不能实时支持流式输出配置。这是因为适配器新建后,事实上已经包含了新建时的是否流式输出状态。需要修改 core 代码才能实时更新,为确保安全,此处没有修改。可以通过重启的方式使得流式输出配置生效。
Checklist / 检查清单
😊 If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
/ 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。
👀 My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
/ 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。
🤓 I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in
requirements.txtandpyproject.toml./ 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到
requirements.txt和pyproject.toml文件相应位置。😮 My changes do not introduce malicious code.
/ 我的更改没有引入恶意代码。
Summary by Sourcery
Add support for OpenAI Responses API as a new provider and wire streaming-response configuration through to relevant flows.
New Features:
Enhancements: